17.7 其他
本节介绍垃圾回收过程中使用的几种辅助结构。
并行任务框架
parfor关注的是任务分配和调度,其自身不具备执行能力。它将多个任务分组交给多个执行线程,然后在执行过程中重新平衡线程的任务分配,确保整个任务在最短时间内完成。
设置函数parforsetup用相关参数初始化desc状态,并完成任务分组。
parfor.go
func parforsetup(desc*parfor,nthr,n uint32,wait bool,body func(*parfor,uint32)) { desc.body=body // 任务函数 desc.nthr=nthr // 任务线程数量 desc.cnt=n // 任务数量 …
// 任务分组 for i:=range desc.thr{ begin:=uint32(uint64(n) *uint64(i) /uint64(nthr)) end:=uint32(uint64(n) *uint64(i+1) /uint64(nthr)) desc.thr[i].pos=uint64(begin) |uint64(end)<<32 } }
最后的循环语句将n个任务编号平分成nthr份,并将开始和结束位置保存到pos的高低位。以10任务5线程为例,thr[0]分到的任务单元就是[0,2)。
线程须主动调用parfordo来获取任务组,执行body任务函数。
parfor.go
func parfordo(desc*parfor) { // 为每个线程分配一个唯一序号 tid:=xadd(&desc.thrseq,1) -1
// 任务函数 body:=desc.body
// 如果只有单个线程,直接按顺序执行 if desc.nthr==1{ for i:=uint32(0);i<desc.cnt;i++ { body(desc,i) } return }
// 用线程序号提取任务组 me:= &desc.thr[tid] mypos:= &me.pos for{ // 先完成自身任务 for{ // 任务进度:直接累加pos低位的起始位置 pos:=xadd64(mypos,1)
// 未超出任务组边界,执行
begin:=uint32(pos) -1
end:=uint32(pos>>32)
if begin<end{
body(desc,begin)
continue
}
break
}
// 提前完成工作,尝试从其他线程偷取任务
idle:=false
for try:=uint32(0); ;try++ {
// 如多次行窃未果,那么准备打卡下班
if try>desc.nthr*4&& !idle{
idle=true
xadd(&desc.done,1)
}
// 如果其他线程都已完成工作,结束
extra:=uint32(0)
if!idle{
extra=1
}
if desc.done+extra==desc.nthr{
if!idle{
xadd(&desc.done,1)
}
goto exit
}
// 随机挑选一个线程
var begin,end uint32
victim:=fastrand1() % (desc.nthr-1)
if victim>=tid{
victim++
}
victimpos:= &desc.thr[victim].pos
for{
// 检查目标线程的当前任务进度
pos:=atomicload64(victimpos)
begin=uint32(pos)
end=uint32(pos>>32)
if begin+1>=end{
end=0
begin=end
break
}
// 有任务可偷,要忙起来了
if idle{
xadd(&desc.done, -1)
idle=false
}
// 将剩余任务偷一半(后半截)
begin2:=begin+ (end-begin)/2
// 记得修改原主的任务结束值
newpos:=uint64(begin) |uint64(begin2)<<32
if cas64(victimpos,pos,newpos) {
begin=begin2
break
}
}
// 将偷来的任务编号保存到自己pos里
if begin<end{
atomicstore64(mypos,uint64(begin)|uint64(end)<<32)
me.nsteal++
me.nstealcnt+=uint64(end) -uint64(begin)
// 跳出偷窃循环,进入外层循环重新执行自己的任务(尽管是偷来的)
break
}
// 没偷到任务,就暂停或退出 ...
}
} exit: … }
缓存队列
gcWork被设计来保存灰色对象,必须在保证并发安全的前提下,拥有足够高的性能。
mgcwork.go
type gcWork struct{ wbuf wbufptr }
该结构的真正核心是workbuf,gcWork不过是外层包装。workbuf作为无锁栈节点,其自身就是一个缓存容器(数组成员)。
mgcwork.go
type workbufhdr struct{ node lfnode nobj int }
type workbuf struct{ workbufhdr obj[(_WorkbufSize-unsafe.Sizeof(workbufhdr{})) /ptrSize]uintptr }
透过gcWork相关方法,我们可以观察workbuf是如何工作的。
mgc.go
var work struct{ full uint64 //lock-free list of full blocks workbuf empty uint64 //lock-free list of empty blocks workbuf partial uint64 //lock-free list of partially filled blocks workbuf }
mgcwork.go
func(ww*gcWork)put(obj uintptr) { w:= (*gcWork)(noescape(unsafe.Pointer(ww)))
// 从work.empty获取一个workbuf复用 wbuf:=w.wbuf.ptr() if wbuf==nil{ wbuf=getpartialorempty(42) w.wbuf=wbufptrOf(wbuf) }
// 直接将obj保存在workbuf.obj数组 wbuf.obj[wbuf.nobj] =obj wbuf.nobj++
// 如果数组填满,则将该数组移交给work.full // 本地obj=nil,下次put时获取一个复用对象填充 if wbuf.nobj==len(wbuf.obj) { putfull(wbuf,50) w.wbuf=0 } }
func putfull(b*workbuf,entry int) { lfstackpush(&work.full, &b.node) }
这种做法有点像内存分配器的cache,优先操作本地缓存,直到满足某个阈值再与全局交换。这么做,可以保证性能,避免直接操作全局队列;另一方面,从全局获取任务时,总是能一次性拿到一组。
就算是无锁数据结构,使用原子操作也会有性能损耗,尤其是在多核环境下。
这段代码,包括work全局变量,有很多C的影子,看上去有些别扭,完全不是Go的风格。如果是自动代码转换,那么下个版本是不是要对runtime里面很多违和的地方清理一下。
消费完毕的workbuf对象会被放回work.empty,以供复用。
mgcwork.go
func(ww*gcWork)get()uintptr{ w:= (*gcWork)(noescape(unsafe.Pointer(ww)))
// 从work.full获取一个workbuf对象 wbuf:=w.wbuf.ptr() if wbuf==nil{ wbuf=getfull(103) if wbuf==nil{ return 0 } w.wbuf=wbufptrOf(wbuf) }
// 直接从本地workbuf提取 wbuf.nobj— obj:=wbuf.obj[wbuf.nobj]
// 本地workbuf已空,将其放回work.empty供复用 if wbuf.nobj==0{ putempty(wbuf,115) w.wbuf=0 }
return obj }
func putempty(b*workbuf,entry int) { lfstackpush(&work.empty, &b.node) }
至于Free-Lock Stack的实现,也很简单利用CAS(Compare&Swap)指令来实现原子替换操作。这里用Node Pointer+Node.PushCount实现了Double-CAS。
lfstack.go
func lfstackpush(headuint64,nodelfnode) { // 累加计数器 node.pushcnt++
// 利用pointer+pushcnt获得唯一流水号 new:=lfstackPack(node,node.pushcnt)
// 逆向展开流水号,进行错误检查 if node1, _ :=lfstackUnpack(new);node1!=node{ throw(“lfstackpush”) }
// 类似自旋,重试直到成功 for{ // 原子读取原head node流水号(多核) old:=atomicload64(head)
// 将当前node作为head
// 未成功前,这个操作并不影响原stack
node.next=old
// 利用CAS指令替换原head
// 如替换失败,则循环重试
if cas64(head,old,new) {
break
}
} }
func lfstackpop(head*uint64)unsafe.Pointer{ for{ // 原子读取stack head old:=atomicload64(head) if old==0{ return nil }
// 展开流水号,获取pointer
node, _ :=lfstackUnpack(old)
// 利用CAS指令修改stack head
next:=atomicload64(&node.next)
if cas64(head,old,next) {
return unsafe.Pointer(node)
}
} }
如果CAS指令判断的仅是old指针地址,而该地址又被意外重用,那就会造成错误结果,这就是所谓的ABA问题。利用“指针地址+计数器”生成唯一流水号,实现Double-CAS,就能避开。
lfstack_amd64.go
func lfstackPack(node*lfnode,cnt uintptr)uint64{ return uint64(uintptr(unsafe.Pointer(node)))<<16|uint64(cnt&(1<<19-1)) }
内存状态统计
除了用GODEBUG=“gctrace=1”输出垃圾回收状态信息外,某些时候我们还需要自行获取内存相关统计数据。
与之相关的数据结构,分别是运行时内部使用的mstats和面向用户的MemStats。两者大部分结构相同,只是在输出结果上有细微调整。
mstats.go
type mstats struct{ alloc uint64 // 当前分配的object内存(含未回收的白色对象) total_alloc uint64 // 历史累计分配内存(当前正在使用和历次回收释放) sys uint64 // 当前从操作系统获取的内存(所有分配总和,不包括已释放) nmalloc uint64 // 分配次数累计 nfree uint64 // 释放次数累计
heap_alloc uint64 // 同alloc heap_sys uint64 // 从操作系统获取的内存(不包括已释放) heap_idle uint64 // 闲置span内存 heap_inuse uint64 // 正在使用span内存(从heap提取,包括stack) heap_released uint64 // 当前已归还操作系统的内存 heap_objects uint64 // 正在使用object数量(不含闲置链表)
stacks_inuse uint64 // 正在使用stack内存(含stackpool) mspan_inuse uint64 // 正在使用mspan内存 mcache_inuse uint64 // 正在使用mcache内存
next_gc uint64 // 下次垃圾回收阈值 last_gc uint64 // 上次垃圾回收结束时间(UnixNano,不包括并发清理) pause_total_ns uint64// 累计STW暂停时间 pause_ns [256]uint64 // 最近垃圾回收周期里STW暂停时间(循环缓冲区) pause_end [256]uint64// 最近垃圾回收周期里STW暂停结束时间(UnixNano) numgc uint32 // 垃圾回收次数 gc_cpu_fraction float64 //GC所耗CPU时间比例(f*100%)
heap_live uint64 // 自上次回收后堆使用内存(黑色+新分配,不包括白色对象) }
object特指cache分配的小块内存,以及large object,而非实际用户对象。
用户通过runtime.ReadMemStats函数来获取统计数据。
mstats.go
func ReadMemStats(m*MemStats) { stopTheWorld(“read mem stats”)
systemstack(func() { readmemstats_m(m) })
startTheWorld() }
func readmemstats_m(stats*MemStats) { updatememstats(nil)
// 前面部分数据结构相同,直接拷贝 memmove(unsafe.Pointer(stats),unsafe.Pointer(&memstats),sizeof_C_MStats)
// 将栈内存从统计数据剔除,仅显示用户逻辑消耗 stats.StackSys+=stats.StackInuse stats.HeapInuse-=stats.StackInuse stats.HeapSys-=stats.StackInuse }
注意:ReadMemStats会进行STW操作,应控制调用时间和次数。
监控输出示例。
heap_idle heap_released
| |
scvg0:inuse:3,idle:1,sys:5,released:0,consumed:5(MB) | | | heap_inuse heap_sys heap_sys-heap_released